package com.dahuan.window;

import com.dahuan.bean.SensorReading;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class Window_CountFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        DataStream<String> inputStream = env.socketTextStream( "localhost", 7777 );
        DataStream<SensorReading> dataStream = inputStream.map( data -> {
            String[] split = data.split( "," );
            return new SensorReading( split[0], new Long( split[1] ), new Double( split[2] ) );
        } );

        //开计数窗口测试
        DataStream<Double> avgStream = dataStream.keyBy( "id" )
                //TODO 十个数据开一次窗口,每隔两个数据滑动一次
                .countWindow( 10, 2 )
                .aggregate( new MyAvgTemp() );


        avgStream.print();
        env.execute("Window_CountFunction");

    }
    //TODO  求和:Double , 个数:Integer
    public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double,Integer>,Double>{

        //创建初始值
        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0,0);
        }

        //添加温度值的 以及温度值的个数
        @Override
        public Tuple2<Double, Integer> add(SensorReading value, Tuple2<Double, Integer> accumulator) {
            //TODO 温度值 ,  温度值的个数
            return new Tuple2<>(accumulator.f0 + value.getTemperature(),accumulator.f1 + 1 );
        }

        //返回这个结果(两个数据相除)
        @Override
        public Double getResult(Tuple2<Double, Integer> accumulator) {
            return accumulator.f0 / accumulator.f1;
        }
        //做一个全面相加
        @Override
        public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
            return new Tuple2<>(a.f0 + b.f0 , a.f1 + b.f1 );
        }
    }
}
